Recommendation EngineΒΆ
Problem Statement
The input data contains a file with user, item and ratings.
The purpose of the exercise is to build a recommendation model
and then predict the affinity for users to various items
# -*- coding: utf-8 -*-
import os
os.chdir("/home/cloudops/spark")
os.curdir
# Load the data file in ALS format (user, item, rating)
ratingsData = sc.textFile("UserItemData.txt")
ratingsData.collect()
# Affinity of User to Item: 'User, Item, Score' [10- highest]
# ['1001,9001,10',
# '1001,9002,1',
# '1001,9003,9',
# '1002,9001,3',
# . . .
# =====================================
# Convert the strings into a proper vector (RDD)
ratingVector = ratingsData.map(lambda l: l.split(','))\
.map(lambda l: (int(l[0]), int(l[1]), float(l[2])))
# =====================================
# Build a SQL Dataframe
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
ratingsDf = sqlContext.createDataFrame(ratingVector,
["user","item","rating"])
# DataFrame[user: bigint, item: bigint, rating: double]
# =====================================
# Build the model based on ALS
# ALS - Automate List Model
from pyspark.ml.recommendation import ALS
als = ALS(rank=10, maxIter=5)
# Affinity rating between users and items
model = als.fit(ratingsDf)
model.userFactors.orderBy("id").collect()
# [Row(id=1001, features=[-0.45539024472236633,
# 0.36960071325302124,
# 0.851772665977478,
# 0.6629341840744019,
# 0.7355863451957703,
# 0.6816525459289551,
# 0.7078778743743896,
# -0.37058207392692566,
# -0.12031808495521545,
# -0.196274533867836]),
# . . .
# =====================================
# Create a Test DataSet of users and items you want ratings for
# What is the affinity betweeen user and item?
# =====================================
testDf = sqlContext.createDataFrame(
[(1001, 9003),
(1001, 9004),
(1001, 9005)],
["user", "item"])
# =====================================
# Predict
# =====================================
predictions = sorted(model.transform(testDf).collect(),
key = lambda r: r[2], reverse=True)
predictions
# [Row(user=1001, item=9003, prediction=9.006633758544922),
# [Row(user=1001, item=9004, prediction=-0.6739926934242249),
# Row(user=1001, item=9005, prediction=-2.546449899673462)]